package cn.doitedu

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.elasticsearch.spark.sql.EsSparkSQL

object ProfileEsLoader {

  def main(args: Array[String]): Unit = {

    if(args.length<2){
      println(
        """
          |傻x，参数都不传，不理你
          |记得下次带上：
          |  索引库名
          |  分区日期
          |""".stripMargin)

      System.exit(1)
    }



    val conf = new SparkConf()
    // es客户端相关配置参数
    conf
      .set("es.index.auto.create", "true")
      .set("es.nodes", "doitedu")
      .set("es.port", "9200")
      .set("es.nodes.wan.only", "true")


    val spark = SparkSession.builder()
      .appName("多易教育,永远良心的大数据培训")
      //.master("local")
      .config(conf)
      .config("spark.sql.shuffle.partitions", "1")
      .enableHiveSupport()
      .getOrCreate()


    // 读hive表
    spark.sql(
      s"""
        |create temporary view v1 as
        |select
        |    guid,
        |    tag_01_01,
        |    tag_01_02,
        |    tag_01_03,
        |    tag_01_04
        |from  tmp.profile_tag01
        |where dt='${args(1)}'
        |
        |""".stripMargin)


    spark.sql(
      s"""
        |create temporary view v2 as
        |select
        |    guid,
        |    tag_02_01,
        |    tag_02_02,
        |    tag_02_03,
        |    tag_02_04
        |from  tmp.profile_tag02
        |where dt='${args(1)}'
        |
        |""".stripMargin)


    spark.sql(
      s"""
        |create temporary view v3 as
        |select
        |    guid,
        |    tag_03_01,
        |    tag_03_02,
        |    tag_03_03,
        |    tag_03_04
        |from  tmp.profile_tag03
        |where dt='${args(1)}'
        |
        |""".stripMargin)

    spark.sql(
      """
        |create temporary view ids as
        |
        |select guid from v1
        |union
        |select guid from v2
        |union
        |select guid from v3
        |
        |
        |""".stripMargin)


    val df = spark.sql(
      """
        |
        |select
        |    ids.guid,
        |    v1.tag_01_01,
        |    v1.tag_01_02,
        |    v1.tag_01_03,
        |    v1.tag_01_04,
        |    v2.tag_02_01,
        |    v2.tag_02_02,
        |    v2.tag_02_03,
        |    v2.tag_02_04,
        |    v3.tag_03_01,
        |    v3.tag_03_02,
        |    v3.tag_03_03,
        |    v3.tag_03_04
        |from ids left join v1 on ids.guid = v1.guid
        |         left join v2 on ids.guid = v2.guid
        |         left join v3 on ids.guid = v3.guid
        |
        |
        |""".stripMargin);



    // 一句话，将spark整理好的dataframe数据，写入elastic search
    EsSparkSQL.saveToEs(df, args(0), Map("es.mapping.id" -> "guid"));


    spark.close()


  }


}
